package indi.mozping;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Date;
import java.util.Properties;

/**
 * @author by mozping
 * @Classname MyConsumer
 * @Description TODO
 * @Date 2019/4/16 19:35
 */
public class MyConsumer {

    static final Logger log = LoggerFactory.getLogger(MyConsumer.class.getName());

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        //设置bootstrap.servers
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.12.39:9092");
//        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("max.poll.records", 1);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "90000");

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        final KafkaConsumer<String, String> consumer1 = new KafkaConsumer<String, String>(props);
        final KafkaConsumer<String, String> consumer222 = new KafkaConsumer<String, String>(props);
        consumer1.subscribe(Arrays.asList("test"));
        consumer222.subscribe(Arrays.asList("test"));


        new Thread(new Runnable() {
            long begin;
            float total;

            public void run() {
                while (true) {
                    ConsumerRecords<String, String> records = consumer1.poll(3000);
                    begin = System.currentTimeMillis();
                    int num = 0;
                    for (ConsumerRecord<String, String> record : records) {
                        num++;
                        log.info("partition = %s, offset = %d,  value = %s%n, time = %s",
                                record.partition(),
                                record.offset(),
                                record.value().substring(0, 10),
                                new Date()
                        );
                        System.out.println();
                    }
                    consumer1.commitSync();
                    total = System.currentTimeMillis() - begin;
                    log.info("消费了" + num + "条消息，耗时" + total + " ms" + new Date());
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

//        new Thread(new Runnable() {
//            long begin;
//            float total;
//
//            public void run() {
//                while (true) {
//                    ConsumerRecords<String, String> records = consumer222.poll(3000);
//                    begin = System.currentTimeMillis();
//                    int num = 0;
//                    for (ConsumerRecord<String, String> record : records) {
//                        num++;
//                        log.info("partition = %s, offset = %d,  value = %s%n, time = %s",
//                                record.partition(),
//                                record.offset(),
//                                record.value(),
//                                new Date()
//                        );
//                    }
//                    consumer222.commitSync();
//                    total = System.currentTimeMillis() - begin;
//                    log.info("消费了" + num + "条消息，耗时" + total + " ms" + new Date());
//                    try {
//                        Thread.sleep(100);
//                    } catch (InterruptedException e) {
//                        e.printStackTrace();
//                    }
//                }
//            }
//        }).start();

    }

}
